{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "182ce5dd",
   "metadata": {},
   "source": [
    "# 在SecretFlow中加载csv数据"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8d6c44d1",
   "metadata": {},
   "source": [
    "The following codes are demos only. It's **NOT for production** due to system security concerns, please **DO NOT** use it directly in production."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "01776225",
   "metadata": {},
   "source": [
    "这篇教程会通过几个示例来展示，如何在SecretFlow中加载CSV数据，进而可以利用SecretFlow进行数据处理，建模。"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e7323a0a",
   "metadata": {},
   "source": [
    "## 设置"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "c2842348",
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "68274f5f",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2023-04-13 13:45:04,665\tINFO worker.py:1538 -- Started a local Ray instance.\n"
     ]
    }
   ],
   "source": [
    "import secretflow as sf\n",
    "\n",
    "# Check the version of your SecretFlow\n",
    "print('The version of SecretFlow: {}'.format(sf.__version__))\n",
    "\n",
    "# In case you have a running secretflow runtime already.\n",
    "sf.shutdown()\n",
    "sf.init(['alice', 'bob', 'charlie'], address=\"local\", log_to_driver=True)\n",
    "alice, bob ,charlie = sf.PYU('alice'), sf.PYU('bob') , sf.PYU('charlie')"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8af24c46",
   "metadata": {},
   "source": [
    "## 接口介绍"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "ffd77211",
   "metadata": {},
   "source": [
    "我们在SecretFlow中提供了类似于`pandas.read_csv`的接口来将各方数据的CSV读取成为一个联邦概念的统一的数据。  \n",
    "- 对于水平场景有`secretflow.horizontal.read_csv` [API](https://www.secretflow.org.cn/docs/secretflow/en/source/secretflow.data.horizontal.html#secretflow.data.horizontal.read_csv)。  \n",
    "- 对于垂直场景有`secretflow.vertical.read_csv` [API](https://www.secretflow.org.cn/docs/secretflow/en/source/secretflow.data.horizontal.html#secretflow.data.vertical.read_csv)。  \n",
    "\n",
    "通过read_csv可以读取多方的csv文件，构成一个FedDataFrame"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "531f5302",
   "metadata": {},
   "source": [
    "**Build Federated Table**  \n",
    "联邦表是一个跨多方的虚拟概念\n",
    "1. 联邦表中各方的数据存储在本地，不允许出域。\n",
    "2. 除了拥有数据的一方之外，没有人可以访问数据存储。\n",
    "3. 联邦表的任何操作都会由Driver调度给每个Worker，执行指令会逐层传递，直到特定拥有数据的Worker的Python Runtime。 框架确保只有 Worker.device 和 Object.device相同的时候才能够操作数据。\n",
    "4. 联合表旨在从中心角度管理和操作多方数据\n",
    "5. 接口方面和`pandas.DataFrame` 对齐，以降低多方数据操作的成本\n",
    "\n",
    "<img alt=\"vdataframe.png\" src=\"resources/vdataframe.png\" width=\"600\">  \n",
    "\n",
    "\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b6b2af40",
   "metadata": {},
   "source": [
    "## 数据下载，分割"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "ed65fa48",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "E0413 13:45:12.618292213  130484 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers\n"
     ]
    }
   ],
   "source": [
    "%%capture\n",
    "%%!\n",
    "wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/iris/iris.csv\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "bdf1afd6",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "alldata_df = pd.read_csv(\"./iris.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "88df4de9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "150"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(alldata_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "9ff06f1b",
   "metadata": {},
   "outputs": [],
   "source": [
    "h_alice_df = alldata_df.loc[:70]\n",
    "h_bob_df = alldata_df.loc[70:]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6bd4a699",
   "metadata": {},
   "source": [
    "将水平拆分后的dataframe分别保存成csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "66d07e39",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 保存数据集到文件\n",
    "import tempfile\n",
    "\n",
    "_, h_alice_path = tempfile.mkstemp()\n",
    "_, h_bob_path = tempfile.mkstemp()\n",
    "h_alice_df.to_csv(h_alice_path, index=False)\n",
    "h_bob_df.to_csv(h_bob_path, index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "609b6f57",
   "metadata": {},
   "outputs": [],
   "source": [
    "v_alice_df = alldata_df.loc[:,['sepal_length','sepal_width']]\n",
    "v_bob_df = alldata_df.loc[:,['petal_length','petal_width','class']]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "43f0863d",
   "metadata": {},
   "source": [
    "将垂直拆分后的dataframe分别保存成csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "c51826c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 保存数据集到文件\n",
    "_, v_alice_path = tempfile.mkstemp()\n",
    "_, v_bob_path = tempfile.mkstemp()\n",
    "v_alice_df.to_csv(v_alice_path, index=True, index_label=\"id\")\n",
    "v_bob_df.to_csv(v_bob_path, index=True, index_label=\"id\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "24b6916a",
   "metadata": {},
   "source": [
    "## 接下来我们以水平为例介绍如何加载csv数据"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "713e09fe",
   "metadata": {},
   "outputs": [],
   "source": [
    "from secretflow.data.horizontal import read_csv\n",
    "from secretflow.security.aggregation.plain_aggregator import PlainAggregator\n",
    "from secretflow.security.compare.plain_comparator import PlainComparator\n",
    "from secretflow.data.split import train_test_split"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "da57e9e3",
   "metadata": {},
   "source": [
    "首先准备好两方的数据csv文件,水平场景要求两方的schema是一致的\n",
    "- Alice: datapath (alice机器能访问到的本地路径)\n",
    "- Bob: datapath (bob机器能访问到的本地路径)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7a0d4ae1",
   "metadata": {},
   "source": [
    "因为水平场景相同schema的数据分布在多方，所以在进行一些dataframe操作的时候需要跨域计算。  \n",
    "read_csv接口需要传入`aggregator`和`comparator`,我们可以在计算的时候指定`安全聚合器`和`安全比较器`来对数据隐私进行保护"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "9c4ffb66",
   "metadata": {},
   "outputs": [],
   "source": [
    "path_dict = {\n",
    "    alice: h_alice_path,\n",
    "    bob: h_bob_path\n",
    "}\n",
    "\n",
    "aggregator = PlainAggregator(charlie)\n",
    "comparator = PlainComparator(charlie)\n",
    "\n",
    "hdf = read_csv(filepath=path_dict, aggregator = aggregator, comparator = comparator)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "d36b91b5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "hdf.columns"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e0c6a4ae",
   "metadata": {},
   "source": [
    "得到的`hdf`是一个FedDataframe，我们接下来可以对他做一些数据处理  \n",
    "参考文档[使用FedDataFrame进行数据预处理](https://www.secretflow.org.cn/docs/secretflow/en/tutorial/data_preprocessing_with_data_frame.html)："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "8bec80d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "label = hdf[\"class\"]\n",
    "data = hdf.drop(columns=\"class\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3f89bc5c",
   "metadata": {},
   "source": [
    "得到的`data`和`label`即可以作为输入传入FLModel或者SLModel进行建模"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "dfc3eb3f",
   "metadata": {},
   "source": [
    "隐语提供了train_test_split可以对数据进行拆分"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "9e065f08",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data,test_data = train_test_split(data,train_size=0.8,shuffle=True,random_state=1234)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "16dadfc1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{alice: (56, 4), bob: (64, 4)} {alice: (15, 4), bob: (16, 4)}\n"
     ]
    }
   ],
   "source": [
    "print(train_data.partition_shape(),test_data.partition_shape())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "849e2e9d",
   "metadata": {},
   "source": [
    "## 接下来我们以垂直为例介绍如何加载csv数据"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "52ea476a",
   "metadata": {},
   "source": [
    "首先准备好两方的数据csv文件,垂直场景两方数据并不强制要求对齐，我们在`read_csv`接口中提供了PSI的能力。\n",
    "Alice: datapath (alice机器能访问到的本地路径)  \n",
    "Bob: datapath (bob机器能访问到的本地路径)  "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "75308a31",
   "metadata": {},
   "source": [
    "垂直场景是各方的schema不同，但是每一方都拥有每一个column的全部数据。不再需要比较器和聚合器。但是各方数据不一定是对齐的，我们需要在读取时候通过`PSI`来进行数据对齐。 \n",
    "\n",
    "- path_dict：数据路径\n",
    "- spu: 用于求交使用的spu设备\n",
    "- keys: 用于求交的keys（支持多列求交）\n",
    "- drop_keys: 求交后需要删去的ID列名"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "c0665453",
   "metadata": {},
   "outputs": [],
   "source": [
    "spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "8a7e0a93",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<secretflow.device.device.spu.SPU at 0x7f3b808347f0>"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spu"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "4b30b5e0",
   "metadata": {},
   "outputs": [],
   "source": [
    "from secretflow.data.vertical import read_csv\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "aca648d6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[2m\u001b[36m(SPURuntime pid=23157)\u001b[0m 2023-04-13 13:45:34.913 [error] [context.cc:operator():132] connect to rank=1 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:44893} (0x0x55850693c900): Connection refused [R1][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:44893 yet, server_id=0\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:36.308 [error] [context.cc:operator():132] connect to rank=0 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:24875} (0x0x562301a903c0): Connection refused [R1][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:24875 yet, server_id=0\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp0y82frfo, precheck_switch:true\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737312870835 | LC_ALL=C uniq -d > duplicate-keys.1681364737312870835\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp0y82frfo, size=150\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.317 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.320 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224, ret=0\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23518)\u001b[0m 2023-04-13 13:45:37.320 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp0y82frfo, out=/tmp/tmp0y82frfo.psi_output_94874\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp5xjv6qs8, precheck_switch:true\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737313158626 | LC_ALL=C uniq -d > duplicate-keys.1681364737313158626\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp5xjv6qs8, size=150\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.316 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.319 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390, ret=0\n",
      "\u001b[2m\u001b[36m(SPURuntime pid=23517)\u001b[0m 2023-04-13 13:45:37.319 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp5xjv6qs8, out=/tmp/tmp5xjv6qs8.psi_output_94874\n"
     ]
    }
   ],
   "source": [
    "path_dict = {\n",
    "    alice: v_alice_path,# alice 本地能访问到的路径\n",
    "    bob: v_bob_path # bob 本地能访问到的路径\n",
    "}\n",
    "#准备spu设备\n",
    "spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))\n",
    "\n",
    "vdf = read_csv(path_dict,spu=spu,keys='id',drop_keys=\"id\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "9d01c4d7",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "vdf.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "7640532b",
   "metadata": {},
   "outputs": [],
   "source": [
    "label = vdf[\"class\"]\n",
    "data =  vdf.drop(columns=\"class\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "cc9d4ee4",
   "metadata": {},
   "source": [
    "同样这里也可以通过train_test_split来进行切分"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "69a79fbb",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data,test_data = train_test_split(data,train_size=0.8,shuffle=True,random_state=1234)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6640aa76",
   "metadata": {},
   "source": [
    "## 接下来，可以用自己的csv数据尝试一下"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
